package com.cloudansys.core.flink.function;

import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.MultiDataEntity;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.List;

@Slf4j
@NoArgsConstructor
@AllArgsConstructor
public class TagStreamProcessFunc extends ProcessFunction<List<MultiDataEntity>, List<MultiDataEntity>> {

    private OutputTag<List<MultiDataEntity>> tag1;
    private OutputTag<List<MultiDataEntity>> tag2;
    private OutputTag<List<MultiDataEntity>> tag3;
    private OutputTag<List<MultiDataEntity>> tag4;
    private OutputTag<List<MultiDataEntity>> tag5;
    private OutputTag<List<MultiDataEntity>> tag6;

    /**
     * 把应变类型和其它类型的分为两道流
     */
    @Override
    public void processElement(List<MultiDataEntity> elements, Context context, Collector<List<MultiDataEntity>> out) throws Exception {
        String targetType = elements.get(0).getTargetType();
        if (Const.TT_YB.equals(targetType)) {
            context.output(tag2, elements);
        } else if (Const.TT_WE.equals(targetType)) {
            context.output(tag3, elements);
        } else if (Const.TT_WY.equals(targetType)) {
            context.output(tag4, elements);
        } else if (Const.TT_ZD.equals(targetType)) {
            context.output(tag5, elements);
        } else if (Const.TT_QJ.equals(targetType)) {
            context.output(tag6, elements);
        } else {
            context.output(tag1, elements);
        }
    }

}
